-
Notifications
You must be signed in to change notification settings - Fork 99
Add a scheduler that ensures single batch of changes from live query due to a transaction that touches multiple source collections #628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: samwillis/fix-self-join
Are you sure you want to change the base?
Conversation
🦋 Changeset detectedLatest commit: 4d5ff08 The changes in this PR will be included in the next version bump. This PR includes changesets to release 12 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
More templates
@tanstack/angular-db
@tanstack/db
@tanstack/db-ivm
@tanstack/electric-db-collection
@tanstack/query-db-collection
@tanstack/react-db
@tanstack/rxdb-db-collection
@tanstack/solid-db
@tanstack/svelte-db
@tanstack/trailbase-db-collection
@tanstack/vue-db
commit: |
Size Change: +2.97 kB (+3.95%) Total Size: 78.2 kB
ℹ️ View Unchanged
|
Size Change: 0 B Total Size: 1.44 kB ℹ️ View Unchanged
|
} from "./types.js" | ||
|
||
export type RunCountUtils = UtilsRecord & { | ||
getRunCount: () => number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a utils.getRunCount()
in order to check this in the tests easily. We should extend this with more run metadata and timing, will be useful for dev tools.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a close look at this PR and left some comments. My main concern is the time complexity of the scheduler's flush
method.
"@tanstack/db": patch | ||
--- | ||
|
||
Add a scheduler that ensures that if a transaction touches multiple collections that feed into a single live query, the liver query only emits a single batch of updates. This fixes an issue where multiple renders could be triggered from a live query under this situation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: the liver query
const collection = bridgeToCreateCollection(options) | ||
|
||
if (config.utils) { | ||
Object.assign(collection.utils, config.utils) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is weird. Why do we need to mutate it after the facts instead of passing it as an option when calling bridgeToCreateCollection
as we used to do before.
): Collection<TResult, string | number, TUtils> { | ||
// This is the only place we need a type assertion, hidden from user API | ||
return createCollection(options as any) as unknown as Collection< | ||
function bridgeToCreateCollection<TResult extends object>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why removing the TUtils
type parameter? Because of that the return type is now less precise and we have to explicitly cast it on L140.
private readonly aliasDependencies: Record< | ||
string, | ||
Array<CollectionConfigBuilder<any, any>> | ||
> = Object.create(null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need Object.create(null)
? Can't we just use an object literal {}
?
} | ||
} catch (error) { | ||
allDone = false | ||
if (firstError === undefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can simplify to firstError ??= error
import type { CollectionConfigBuilder } from "./collection-config-builder.js" | ||
import type { CollectionSubscription } from "../../collection/subscription.js" | ||
|
||
const loadMoreCallbackSymbol = Symbol(`tanstack.db.loadMore`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I propose we change this one to @tanstack/db.loadMore
to be consistent with the naming we used in collection-registry.ts
:
const BUILDER_SYMBOL = Symbol.for(`@tanstack/db.collection-config-builder`)
|
||
// Do not provide the callback that loads more data | ||
// if there's no more data to load | ||
// otherwise we end up in an infinite loop trying to load more data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is useful to keep
|
||
// We need to call `maybeRunGraph` even if there's no data to load | ||
// because we need to mark the collection as ready if it's not already | ||
// and that's only done in `maybeRunGraph` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is also useful to keep but may need to be changed slightly (at least the references to maybeRunGraph
need to be changed to scheduleGraphRun
)
const subscriptionWithLoader = subscription as SubscriptionWithLoader | ||
|
||
const boundLoader = | ||
subscriptionWithLoader[loadMoreCallbackSymbol] ?? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If i'm not mistaked, the trick we do with ?? here is exactly what ??=
does so we can rewrite this as:
subscriptionWithLoader[loadMoreCallbackSymbol] ??=
this.loadMoreIfNeeded.bind(this, subscription)
this.loadMoreIfNeeded.bind(this, subscription) | ||
) | ||
|
||
// Cache the bound loader on the subscription using a symbol property. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"bound loader" is quite cryptic.
stacked on #625
Live Query Scheduler Overview
Overview
This change introduces a scoped, dependency-aware scheduler that guarantees every live-query builder runs at most once per transaction, even when multiple source collections or derived queries fire in the same
mutate
call. The scheduler groups work by transaction id, dedupes entries by theCollectionConfigBuilder
instance, tracks dependencies between builders, and flushes synchronously whenTransaction.mutate
exits. The net effect: optimistic updates from a single transaction coalesce into a single graph run for each live query, so downstream frameworks see one change batch per transaction.How scheduling works
CollectionSubscriber
immediately forwards changes to the D2 input and callsCollectionConfigBuilder.scheduleGraphRun
.scheduleGraphRun
records which upstream builders the current builder depends on (based on the collections it subscribed to) and hands a job to the shared scheduler with:contextId
: the transaction id, grouping all work triggered by that transaction.jobId
: the builder instance; repeated schedules for the same builder merge into a single entry.dependencies
: the set of upstream builders that must finish before this builder can run.When the graph actually runs
Transaction.mutate
, we callregisterTransaction
before the user’s callback andunregisterTransaction
in afinally
block afterward.registerTransaction
clears any stale entries from previous failed scopes.unregisterTransaction
callsscheduler.flush(tx.id)
.flush
now loops while a context map exists, so if a job enqueues additional work during its own execution (e.g., the join scheduling itself after its parents run), the scheduler immediately picks up the new entry before leaving the transaction scope.scheduleGraphRun
runsmaybeRunGraph
immediately (backward compatible behaviour).Nested live queries (the “diamond” cases)
liveQueryA
and rawcollectionB
) behaves the same way: even ifcollectionB
fires first, the join job is deferred untilliveQueryA
has finished.Order-by loaders and batching
maybeRunGraph
loop keeps callinggraph.run()
whilependingWork()
is true. If an order-by loader requests more rows, it pushes those changes into the same D2 input, triggeringpendingWork()
again.CollectionConfigBuilder.isGraphRunning
istrue
, so any nested call toscheduleGraphRun
is ignored; the loader’s extra rows are consumed by the ongoing loop.Tests
scheduler.test.ts
now covers:mutate
).getRunCount
).Each of the diamond tests mutates once and then performs another transaction with updates, demonstrating that we still emit exactly one additional batch—no duplicates, no missed runs—and that
getRunCount()
only increments once per transaction.Summary
Transaction.mutate
clears before and flushes after every scope, so everything runs synchronously in the optimistic phase.flush
loops until no jobs remain, ensuring child live queries (joins) run exactly once per transaction after their parents.